#include "db_mgr.h"

#include <sstream>

#include <cppconn/driver.h>
#include <cppconn/exception.h>
#include <cppconn/warning.h>
#include <cppconn/metadata.h>
#include <cppconn/prepared_statement.h>
#include <cppconn/resultset.h>
#include <cppconn/resultset_metadata.h>
#include <cppconn/statement.h>
#include <mysql_driver.h>
#include <mysql_connection.h>

#include <common/log/log.h>

#include "db_cfg.h"


DBMgr::DBMgr()
{
    cfg_ = std::make_shared<DBCfg>();
}

DBMgr::~DBMgr()
{
    mysqlConn_->close();
}

bool DBMgr::init()
{

    if (!cfg_->init())
    {
        LOG_ERROR("DB CONFIG INIT FAILED.");
        return false;
    }

    try
    {
        sql::Driver * driver = sql::mysql::get_driver_instance();
        if (!driver)
        {
            LOG_ERROR("get_mysql_driver_instance FAILED.");
            return false;
        }
        const DBConfigInfo& dbInfo = cfg_->getDBInfo();
        std::stringstream ss;
        ss << dbInfo.addr << ":" << dbInfo.port;
        mysqlConn_.reset(driver->connect(ss.str().c_str(), dbInfo.user.c_str(), dbInfo.passwd.c_str()));
        if (!mysqlConn_)
        {
            LOG_ERROR("No sql connected.");
            return false;
        }

        // create database
        std::unique_ptr<sql::Statement> stmt(mysqlConn_->createStatement());
        stmt->execute(dbInfo.sqlSchema);

        // create `tables` table
        mysqlConn_->setSchema(dbInfo.dbname);
        stmt->execute(dbInfo.sqlVersion);

        // got every table version info from `tables` table
        std::unique_ptr<sql::ResultSet> res(stmt->executeQuery(" select tablename, version from `tables` "));
        std::map<std::string, uint32_t> versionInfos;
        while (res->next())
        {
            versionInfos[res->getString(1)] = res->getInt(2);
        }

        // create and update all table defined in db.xml
        auto getTableVersion = [&versionInfos](const std::string& tableName)->int32_t
        {
            auto it = versionInfos.find(tableName);
            return it == versionInfos.end() ? 0 : it->second;
        };

        for (auto& it : dbInfo.sqlTables)
        {
            int32_t version = getTableVersion(it.first);

            for (auto& itv : it.second)
            {
                if (itv.first <= version)
                {
                    continue;
                }

                // LOG_DEBUG(std::endl << itv.second << std::endl;);
                stmt->execute(itv.second);

                std::unique_ptr<sql::PreparedStatement> prepStmt(mysqlConn_->prepareStatement("replace into `tables` (`tablename`,`version`) values ( ?, ? )"));
                prepStmt->setString(1, it.first);
                prepStmt->setInt(2, itv.first);

                if (prepStmt->executeUpdate() <= 0)
                {
                    LOG_ERROR("Update table version failed, table: " << it.first << " version: " << itv.first);
                    return false;
                }
            }
        }
    }
    catch (sql::SQLException &e)
    {
        LOG_ERROR("# ERR: " << e.what()
                  << " (MySQL error code: " << e.getErrorCode()
                  << ", SQLState: " << e.getSQLState() << " )");
        return false;
    }
    return true;
}


bool DBMgr::executeSql(const std::string& sql)
{
    std::vector<std::string> sqls;
    sqls.push_back(sql);
    return executeSql(sqls);
}

bool DBMgr::executeSql(const std::vector<std::string>& sqls)
{
    if (mysqlConn_->isClosed())
    {
        if (!mysqlConn_->reconnect())
        {
            LOG_ERROR("DB CONNECTIN IS CLOSED.");
            return false;
        }
    }

    std::lock_guard<std::mutex> lock(mutex_);

    bool isAuto = mysqlConn_->getAutoCommit();
    mysqlConn_->setAutoCommit(false);

    try
    {
        for (const auto& sql : sqls)
        {
            std::unique_ptr<sql::Statement> stmt(mysqlConn_->createStatement());
            stmt->execute(sql);
            LOG_INFO("Execute sql suc: " << sql);
        }
    }
    catch (sql::SQLException &e)
    {
        LOG_ERROR("# ERR: " << e.what()
                  << " (MySQL error code: " << e.getErrorCode()
                  << ", SQLState: " << e.getSQLState() << " )");

        mysqlConn_->rollback();
        mysqlConn_->setAutoCommit(isAuto);
        return false;
    }
    mysqlConn_->commit();
    mysqlConn_->setAutoCommit(isAuto);
    return true;
}

// bool createTableQuerySql(const proto::db::TableInfo& tableInfo,
//                          const google::protobuf::Message *msg,
//                          const google::protobuf::Descriptor *des,
//                          std::string& sql)
// {
//     auto ref = msg->GetReflection();

//     // select fieldname1, fieldname2 from tablename where id = 1
//     std::stringstream ss;
//     ss << "select " ;
//     std::stringstream sss, ssw;
//     ssw << " from " << des->name() << " where ";
//     if (des->field_count() == 0)
//     {
//         return true;
//     }

//     bool firstset = true, firstwhere = true;
//     for (int i = 0; i < des->field_count(); ++i)
//     {
//         auto field = des->field(i);
//         if (!isSupportFieldType(ref, msg, field))
//         {
//             LOG_ERROR("field type not support, table:" << tableInfo.table_proto_name()
//                       << " field:" << field->name());
//             return false;
//         }

//         if (!ref->HasField(*msg, field))
//         {
//             if (field->is_required())
//             {
//                 LOG_ERROR("required field not set, table:" << tableInfo.table_proto_name()
//                           << " field:" << field->name());
//                 return false;
//             }
//             else
//             {
//                 continue;
//             }
//         }
//         else
//         {
//             if (field->is_required())
//             {
//                 if (!firstwhere)
//                 {
//                     ssw << " , ";
//                 }
//                 ssw << field->name() << " = " << getFieldAsString(ref, msg, field) << " ";
//                 firstwhere = false;
//             }
//             else
//             {
//                 if (!firstset)
//                 {
//                     sss << " , ";
//                 }
//                 sss << field->name() << " ";
//                 firstset = false;
//             }
//         }
//     }
//     ss << sss.str() << ssw.str();
//     LOG_DEBUG("execute sql:" << ss.str());
//     sql = std::move(ss.str());
//     return true;
// };


// bool queryTable(const proto::db::TableInfo& tableInfo,
//                 const google::protobuf::Message *msg,
//                 const google::protobuf::Descriptor *des,
//                 TableRetInfo& retInfo)
// {
//     std::string sql;
//     if (!createTableQuerySql(tableInfo, msg, des, sql))
//     {
//         LOG_ERROR("create sql failed.");
//         return false;
//     }

//     if (mysqlConn_->isClosed())
//     {
//         if (!mysqlConn_->reconnect())
//         {
//             LOG_ERROR("DB CONNECTIN IS CLOSED.");
//             return false;
//         }
//     }
//     bool isAuto = mysqlConn_->getAutoCommit();
//     mysqlConn_->setAutoCommit(false);

//     try
//     {
//         std::unique_ptr<sql::Statement> stmt(mysqlConn_->createStatement());
//         std::unique_ptr<sql::ResultSet> rst = stmt->executeQuery(sql);

//         if (rst->rowsCount() <= 0)
//         {
//             return true;
//         }
//         retInfo.set_table_proto_name(des->name());

//         while (rst->next())
//         {
//             // retInfo.add_proto_buffers();
//         }

//         LOG_INFO("Execute sql suc: " << sql);
//     }
//     catch (sql::SQLException &e)
//     {
//         LOG_ERROR("# ERR: " << e.what()
//                   << " (MySQL error code: " << e.getErrorCode()
//                   << ", SQLState: " << e.getSQLState() << " )");

//         mysqlConn_->rollback();
//         mysqlConn_->setAutoCommit(isAuto);
//         return false;
//     }
//     mysqlConn_->commit();
//     mysqlConn_->setAutoCommit(isAuto);

// }
